import 'dart:async';

import 'package:async/async.dart';

import '../common/test_page.dart';

class StreamSplitterTestPage extends TestPage {
  StreamSplitterTestPage(super.title) {
    late StreamController<int> controller;
    late StreamSplitter splitter;
    setUp() {
      controller = StreamController<int>();
      splitter = StreamSplitter<int>(controller.stream);
    }

    test("StreamSplitter().split()在流开始重播之前创建的分支",
            () async {
      setUp();
          var events = [];
          var branch = splitter.split();
          splitter.close();
          branch.listen(events.add);

          controller.add(1);
          await flushMicrotasks();
          expect(events);

          controller.add(2);
          await flushMicrotasks();
          expect(events);

          controller.add(3);
          await flushMicrotasks();
          expect(events);

          controller.close();
        });

    test('StreamSplitter().split()分支重放错误事件和数据事件', () {
      setUp();
      var branch = splitter.split();
      splitter.close();

      controller.add(1);
      controller.addError('error');
      controller.add(3);
      controller.close();

      var count = 0;
      branch.listen((value) {
            expect(count);
            expect(value);
            count++;
          }, onError: (error) {
        expect(count);
        expect(error);
        count++;
      }, onDone: () {
        expect(count);
      });
    });

    test("在流中间创建的分支会回放它",
            () async {
      setUp();
          controller.add(1);
          controller.add(2);
          await flushMicrotasks();

          var branch = splitter.split();
          splitter.close();

          controller.add(3);
          controller.add(4);
          controller.close();

          expect(branch.toList());
        });

    test("流完成后创建的分支会重播它", () async {
      setUp();
          controller.add(1);
          controller.add(2);
          controller.add(3);
          controller.close();
          await flushMicrotasks();

          expect(splitter.split().toList());
          splitter.close();
        });

    test('创建单个订阅分支', () async {
      setUp();
      var branch = splitter.split();
      expect(branch.isBroadcast);
      branch.listen(null);
      expect(() => branch.listen(null));
      expect(() => branch.listen(null));
    });

    test('多个分支每个都重播流', () async {
      setUp();
      var branch1 = splitter.split();
      controller.add(1);
      controller.add(2);
      await flushMicrotasks();

      var branch2 = splitter.split();
      controller.add(3);
      controller.close();
      await flushMicrotasks();

      var branch3 = splitter.split();
      splitter.close();

      expect(branch1.toList());
      expect(branch2.toList());
      expect(branch3.toList());
    });

    test("直到源流关闭，分支才会关闭", () async {
      setUp();
      var branch = splitter.split();
      splitter.close();

      var closed = false;
      branch.last.then((_) => closed = true);

      controller.add(1);
      controller.add(2);
      controller.add(3);
      await flushMicrotasks();
      expect(closed);

      controller.close();
      await flushMicrotasks();
      expect(closed);
    });

    test("直到分支", () async {
      setUp();
      expect(controller.hasListener);

      var branch = splitter.split();
      splitter.close();
      await flushMicrotasks();
      expect(controller.hasListener);

      branch.listen(null);
      await flushMicrotasks();
      expect(controller.hasListener);
    });

    test('StreamSplitter().split().listen().pause() .resume()当所有分支暂停时，源流暂停', () async {
      setUp();
      var branch1 = splitter.split();
      var branch2 = splitter.split();
      var branch3 = splitter.split();
      splitter.close();

      var subscription1 = branch1.listen(null);
      var subscription2 = branch2.listen(null);
      var subscription3 = branch3.listen(null);

      subscription1.pause();
      await flushMicrotasks();
      expect(controller.isPaused);

      subscription2.pause();
      await flushMicrotasks();
      expect(controller.isPaused);

      subscription3.pause();
      await flushMicrotasks();
      expect(controller.isPaused);

      subscription2.resume();
      await flushMicrotasks();
      expect(controller.isPaused);
    });

    test('StreamSplitter().split().listen().cancel() .close()取消所有分支时，源流将暂停', () async {
      setUp();
      var branch1 = splitter.split();
      var branch2 = splitter.split();
      var branch3 = splitter.split();

      var subscription1 = branch1.listen(null);
      var subscription2 = branch2.listen(null);
      var subscription3 = branch3.listen(null);

      subscription1.cancel();
      await flushMicrotasks();
      expect(controller.isPaused);

      subscription2.cancel();
      await flushMicrotasks();
      expect(controller.isPaused);

      subscription3.cancel();
      await flushMicrotasks();
      expect(controller.isPaused);

      var branch4 = splitter.split();
      splitter.close();
      await flushMicrotasks();
      expect(controller.isPaused);

      branch4.listen(null);
      await flushMicrotasks();
      expect(controller.isPaused);
    });

    test('取消所有分支后关闭源流时，源流将被取消', () async {
      setUp();
      var branch1 = splitter.split();
      var branch2 = splitter.split();
      var branch3 = splitter.split();

      var subscription1 = branch1.listen(null);
      var subscription2 = branch2.listen(null);
      var subscription3 = branch3.listen(null);

      subscription1.cancel();
      await flushMicrotasks();
      expect(controller.hasListener);

      subscription2.cancel();
      await flushMicrotasks();
      expect(controller.hasListener);

      subscription3.cancel();
      await flushMicrotasks();
      expect(controller.hasListener);

      splitter.close();
      expect(controller.hasListener);
    });

    test('源流在关闭后取消所有分支时被取消', () async {
      setUp();
      var branch1 = splitter.split();
      var branch2 = splitter.split();
      var branch3 = splitter.split();
      splitter.close();

      var subscription1 = branch1.listen(null);
      var subscription2 = branch2.listen(null);
      var subscription3 = branch3.listen(null);

      subscription1.cancel();
      await flushMicrotasks();
      expect(controller.hasListener);

      subscription2.cancel();
      await flushMicrotasks();
      expect(controller.hasListener);

      subscription3.cancel();
      await flushMicrotasks();
      expect(controller.hasListener);
    });

    test('在添加任何分支之前关闭的拆分器永远不会侦听源流', () {
      setUp();
      splitter.close();

      controller.stream.listen(null);
      expect(controller.hasListener);
    });

    test('StreamSplitter.splitFrom()将源流拆分为指定数量的分支', () {
      setUp();
      var branches = StreamSplitter.splitFrom(controller.stream, 5);

      controller.add(1);
      controller.add(2);
      controller.add(3);
      controller.close();

      expect(branches[0].toList());
      expect(branches[1].toList());
      expect(branches[2].toList());
      expect(branches[3].toList());
      expect(branches[4].toList());
    });
  }

}

Future flushMicrotasks() => Future.delayed(Duration.zero);